package com.atguigu.dga.assess.assessor.quality;

import com.atguigu.dga.assess.assessor.AssessorTemplate;
import com.atguigu.dga.assess.bean.AssessParam;
import com.atguigu.dga.assess.bean.GovernanceAssessDetail;
import com.atguigu.dga.config.MetaConstant;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.stereotype.Component;

import java.io.IOException;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.util.ArrayList;
import java.util.List;

/**
 * Created by Smexy on 2023/11/2

 必须日分区表
 前一天数据产出的数据量，超过前x天平均产出量{upper_limit}% ，或低于{lower_limit}%  ，则给0分，其余10分
 */
@Component("TABLE_PRODUCT_VOLUME_MONITOR")
public class CheckTableProduceVolume extends AssessorTemplate
{
    @Autowired
    private ApplicationContext context;
    /*
        1.过滤日分区表
        2.排除特殊表 dim_date ,dim_date_tmp
        3.统计 当天 和 前n天，表的分区目录中的总数据量(递归)
        4.计算 当天的数据量 和 前n天的平均数据量
        5.对比

     */
    @Override
    protected void assess(AssessParam param, GovernanceAssessDetail detail) throws Exception {

        //过滤
        String lifecycleType = param.getMetaInfo().getTableMetaInfoExtra().getLifecycleType();
        if (!MetaConstant.LIFECYCLE_TYPE_DAY.equals(lifecycleType)) {
            return ;
        }
        //取参数
        Integer uLPercent = getIntegerValueFromConfig(param, "upper_limit");
        Integer lLPercent = getIntegerValueFromConfig(param, "lower_limit");
        Integer days = getIntegerValueFromConfig(param, "days");
        //获取到表所存放的hdfs的目录
        String tableFsPath = param.getMetaInfo().getTableFsPath();
        //获取客户端
        FileSystem hdfs = context.getBean(FileSystem.class);
        //计算 从当前的考评日期 到 前n天，表目录中相关分区的数据量
        //String assessDate = param.getAssessDate();
        //为了测试
        String assessDate = "2022-06-10";

        String assessPartitionDt = LocalDate.parse(assessDate).minusDays(1).toString();
        List<PartitionDataVolume> result = statsPartitionDataVolume(hdfs, assessDate, days, tableFsPath);

        //关闭客户端
        hdfs.close();

        //判断，如果只有今天一天的导数，没有历史导数，退出
        if (result.size() <= 1){
            return ;
        }

        //获取今天导数的数据量
        Long assessDataVolume = result.get(0).getVolume();
        //统计过去x天的平均数据量
        double avgDataVolume = result.stream()
                                .filter(d -> !assessPartitionDt.equals(d.getDt()))
                                .mapToLong(d -> d.getVolume())
                                .average()
                                .getAsDouble();
        //计算阈值
        BigDecimal upper = BigDecimal.valueOf(avgDataVolume).multiply(BigDecimal.valueOf(100 + uLPercent)).movePointLeft(2);
        BigDecimal lower = BigDecimal.valueOf(avgDataVolume).multiply(BigDecimal.valueOf(100 - lLPercent)).movePointLeft(2);

        //对比
        if (BigDecimal.valueOf(assessDataVolume).compareTo(upper) == 1
            ||
            BigDecimal.valueOf(assessDataVolume).compareTo(lower) == -1
        ){
            String template = "当前表在今天导数的分区的产生数据量是: %d ,超过了过去 %d 天的平均值: %f 的 上限: %d%% 或 低于下限: %d%% ";
            String comment = String.format(template,assessDataVolume,days,avgDataVolume,uLPercent,lLPercent);
            assessScore(BigDecimal.ZERO,"表产出数据量异常",comment,detail,false,null);
        }

    }

    /*
        注意： 2023-05-26是调度时间，计算的数据是2023-05-25日的数据。

        举例：
            考评日期是 2023-05-26
            days = 3.
              取 assessDate 2023-05-26， 获取的分区  dt=2023-05-25
              取 assessDate 2023-05-25， 获取的分区  dt=2023-05-24
              取 assessDate 2023-05-24， 获取的分区  dt=2023-05-23
              取 assessDate 2023-05-23， 获取的分区  dt=2023-05-22

        取 days + 1 天的统计数据。
     */
    private List<PartitionDataVolume> statsPartitionDataVolume(FileSystem hdfs, String assessDate,
                                                               Integer days, String tableFsPath) throws Exception {
        List<PartitionDataVolume> result = new ArrayList<>();

        for (Integer i = 0; i <= days; i++) {
            //获取要计算的日期  日期要随着i递减
            String dt = LocalDate.parse(assessDate).minusDays(1 + i).toString();
            //分区目录
            Path partitionPath = new Path(tableFsPath, "dt=" + dt);
            //判断分区目录是否存在
            if (hdfs.exists(partitionPath)){
                //使用客户端列出目录中所有的子文件
                FileStatus[] fileStatuses = hdfs.listStatus(partitionPath);
                //编写方法递归统计目录中所有文件的数据量
                PartitionDataVolume partitionDataVolume = new PartitionDataVolume(dt, 0l);
                statDirDataSize(fileStatuses,hdfs,partitionDataVolume);
                result.add(partitionDataVolume);
            }
        }

        return result;

    }

    private void statDirDataSize(FileStatus[] fileStatuses, FileSystem hdfs, PartitionDataVolume partitionDataVolume) throws IOException {
        for (FileStatus fileStatus : fileStatuses) {
            if (fileStatus.isFile()){
                partitionDataVolume.setVolume(
                    partitionDataVolume.getVolume() + fileStatus.getLen()
                );
            }else {
                //是目录，继续向下遍历  列出当前目录的子目录
                FileStatus[] subFileStatus = hdfs.listStatus(fileStatus.getPath());
                statDirDataSize(subFileStatus,hdfs,partitionDataVolume);
            }
        }
    }

    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    private class PartitionDataVolume{
        private String dt;
        private Long volume;
    }
}
